In this lab, we're going to look at data streaming with Apache Spark. At the end of the lab, you should be able to:
StreamingContext
object.Let's start by importing the packages we'll need. This week, we'll need to install the sseclient
package so we can connect to the Wikipedia stream. This package is not installed on student vDesktop environments, but you can install it if you're running at home or using Docker by executing the code in the box below:
In [ ]:
!pip install sseclient
Like last week, we're going to use pyspark
, a Python package that wraps Apache Spark and makes its functionality available in Python. We'll also use a few of the standard Python libraries - json
, socket
, threading
and time
- as well as the sseclient
package you just installed to connect to the event stream.
Note: You don't need to understand how these packages are used to connect to the event stream, but the code is below if you're curious.
In [ ]:
import json
import pyspark
import socket
import threading
import time
from pyspark.streaming import StreamingContext
from sseclient import SSEClient
Currently, Spark supports three kinds of streaming connection out of the box:
While it's possible to connect to other kinds of streams too, we must write our own code to do it and, at present, this is unsupported in Python (although it is possible in Java and Scala). However, Spark also supports streaming data from arbitrary TCP socket endpoints and so we can instead relay the remote data stream to a local socket port to enable Spark to consume it.
The code in the box below connects to the Wikipedia event stream and publishes its content to a local port. While you don't need to understand it to complete the lab, the basic logic is as follows:
In [ ]:
def relay():
events = SSEClient('https://stream.wikimedia.org/v2/stream/recentchange')
s = socket.socket()
s.bind(('localhost', 50000))
s.listen(1)
while True:
try:
client, address = s.accept()
for event in events:
if event.event == 'message':
client.sendall(event.data)
break
except:
pass
finally:
client.close()
threading.Thread(target=relay).start()
Now that we have our stream relay set up, we can start to analyse its contents. First, let's initialise a SparkContext
object, which will represent our connection to the Spark cluster. To do this, we must first specify the URL of the master node to connect to. As we're only running this notebook for demonstration purposes, we can just run the cluster locally, as follows:
In [ ]:
sc = pyspark.SparkContext(master='local[*]')
Next, we create a StreamingContext
object, which represents the streaming functionality of our Spark cluster. When we create the context, we must specify a batch duration time (in seconds), to tell Spark how often it should process data from the stream. Let's process the Wikipedia data in batches of one second:
In [ ]:
ssc = StreamingContext(sc, 1)
Using our StreamingContext
object, we can create a data stream from our local TCP relay socket with the socketTextStream
method:
In [ ]:
stream = ssc.socketTextStream('localhost', 50000)
Even though we've created a data stream, nothing happens! Before Spark starts to consume the stream, we must first define one or more operations to perform on it. Let's count the number of edits made by different users in the last minute:
In [ ]:
users = (
stream.map(json.loads) # Parse the stream data as JSON
.map(lambda obj: obj['user']) # Extract the values corresponding to the 'user' key
.map(lambda user: (user, 1)) # Give each user a count of one
.window(60) # Create a sliding window, sixty seconds in length
.reduceByKey(lambda a, b: a + b) # Reduce all key-value pairs in the window by adding values
.transform( # Sort by the largest count
lambda rdd: rdd.sortBy(lambda kv: kv[1], ascending=False))
.pprint() # Print the results
)
In [ ]:
ssc.start()
time.sleep(120)
ssc.stop()
As can be seen, Spark counts the number of edits made by each user in the past sixty seconds and emits updates once per second (the original batch duration of the StreamingContext
).